Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consolidate sort and external_sort #1596

Merged
merged 9 commits into from
Jan 21, 2022
Merged

Conversation

yjshen
Copy link
Member

@yjshen yjshen commented Jan 17, 2022

Which issue does this PR close?

Closes #1571

Rationale for this change

We should have only a single sort operator that does in-memory sorting if it has enough memory budget but then spills to disk if needed.

What changes are included in this PR?

  1. Remove in_mem_sort, buffer batches in memory, and do combine then sort when memory is insufficient and spill.
  2. Duplicate tests SortExec into ExternalSort, retain spill-related tests in ExternalSort.

Are there any user-facing changes?

No

@github-actions github-actions bot added the datafusion Changes in the datafusion crate label Jan 17, 2022
struct AggregatedMetricsSet {
intermediate: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
final_: Arc<std::sync::Mutex<Vec<ExecutionPlanMetricsSet>>>,
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Introduce AggregatedMetricsSet for ExternalSortExec since it may include multi partial sort. Each partial sort itself is SortPreservingMergeStream and has its own metrics set.

@@ -656,4 +774,187 @@ mod tests {

Ok(())
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy all tests from SortExec here.

self.index += 1;
Some(Ok(self.batches[self.index - 1].as_ref().clone()))
} else {
None
})
});
self.baseline_metrics.record_poll(poll)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make metrics right when there is only one record batch.

@alamb
Copy link
Contributor

alamb commented Jan 17, 2022

Thank you @yjshen -- this looks awesome -- I will try and review this carefully, but likely won't have time until tomorrow

@yjshen
Copy link
Member Author

yjshen commented Jan 18, 2022

1. [bench] sort_limit_query_sql

cargo criterion --bench sort_limit_query_sql

No noticeable difference between this branch with which it originates:

W/ this PR:

sort_and_limit_by_int   time:   [3.3633 ms 3.4736 ms 3.6268 ms]                                   

sort_and_limit_by_float time:   [3.3644 ms 3.4726 ms 3.6342 ms]

sort_and_limit_lex_by_int                                                                             
                        time:   [3.7431 ms 4.1084 ms 4.6626 ms]

sort_and_limit_lex_by_string                                                                             
                        time:   [3.4665 ms 3.6071 ms 3.7919 ms]

W/o this PR:

sort_and_limit_by_int   time:   [3.3156 ms 3.3392 ms 3.3626 ms]

sort_and_limit_by_float time:   [3.2272 ms 3.6257 ms 4.3373 ms]

sort_and_limit_lex_by_int
                        time:   [3.4235 ms 3.4393 ms 3.4558 ms]

sort_and_limit_lex_by_string
                        time:   [3.3962 ms 3.4127 ms 3.4298 ms]

2. TPC-H sf=1 sort_extendedprice_discount

Three times slower for the external_sort compared to the previous sort.

./target/release/tpch benchmark datafusion --path ./data --format tbl --query 1 --batch-size 10240 --partitions 1

I changed q1 locally to run directly from tpch program to:

select
    l_returnflag,
    l_linestatus,
    l_quantity,
    l_extendedprice,
    l_discount,
    l_tax
from
    lineitem
order by
    l_extendedprice,
    l_discount;

query plan:

SortExec: [l_extendedprice@3 ASC NULLS LAST,l_discount@4 ASC NULLS LAST]
  ProjectionExec: expr=[l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax]
    CsvExec: files=[./data/lineitem.tbl], has_header=false, limit=None

W/ this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 1, batch_size: 10240, path: "./data", file_format: "tbl", mem_table: false }
Query 1 iteration 0 took 35683.2 ms
Query 1 iteration 1 took 32783.6 ms
Query 1 iteration 2 took 32709.3 ms
Query 1 avg time: 33725.36 ms

W/o this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 1, batch_size: 10240, path: "./data", file_format: "tbl", mem_table: false }
Query 1 iteration 0 took 8675.6 ms
Query 1 iteration 1 took 7833.2 ms
Query 1 iteration 2 took 8046.0 ms
Query 1 avg time: 8184.96 ms

@yjshen
Copy link
Member Author

yjshen commented Jan 18, 2022

After a quick investigation with flamegraph:

Flamegraph with this PR:
flamegraph_svg

more than 70% of the time is spent on SortKeyCursor::compare, which deteriorates to item-wise comparison, compared to Arrow sort kernel which is compared on Array.

only nearly 40% of the time is sort related:
flamegraph_svg

@yjshen
Copy link
Member Author

yjshen commented Jan 18, 2022

The logic for ExternalSort:

  1. get batch from input, sort it, and buffer it in memory
  2. when memory threshold meet, do "N-way merge" and spill the results to file
  3. repeat 1 and 2 until input is exhausted.
  4. another "N-way merge": merge current in-mem buffered batches as well as all spills, to get the total order.

Currently, I've unified all "N-way merge" into SortPreservingMergeStream. Inside SPMS, I construct a min-heap size of "N" to minimize merging memory consumption. However, the performance deterioration of item-wise heap compare seems to overweight its benefits of memory saving according to the TPC-H lineitem sort results.

The original sort algorithm is to combine_batches first and then sort the single batch. However, this suffers too much extra memory usage for "combine" first, which doubles the memory usage I think is not acceptable.

Therefore, I think it is probably worth trying https://github.com/jorgecarleitao/arrow2/blob/main/src/compute/merge_sort/mod.rs or bringing a new memory-efficient sort without combining first.

cc @alamb @jorgecarleitao @houqp @tustvold.

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

Starting to check this out @yjshen -- thank you for the detailed analysis so far

@alamb
Copy link
Contributor

alamb commented Jan 18, 2022

more than 70% of the time is spent on SortKeyCursor::compare, which deteriorates to item-wise comparison, compared to Arrow sort kernel which is compared on Array.

There is a bunch of other overhead in that column I suspect we could avoid with some additional engineering (follow on PRs) such as the use of an RWLock to defer the comparator initialization, for example

Copy link
Contributor

@alamb alamb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@yjshen 🏅🏅🏅🏅 -- this is really nice. Thank you so much.

The original sort algorithm is to combine_batches first and then sort the single batch. However, this suffers too much extra memory usage for "combine" first, which doubles the memory usage I think is not acceptable.

I would phrase this as "we can improve significantly" and the merge-sort pointer in arrow2 is a great potential place to look;

Given that datafusion master already this "double memory" usage behavior, I think it is acceptable to keep the same behavior and improve as a follow on PR.

Thus, here is how I suggest we proceed:

Change ExternalSorter to buffer input batches and then call combine_batches and sort if there is sufficient memory (aka do the same thing as master does, so there are no performance regressions) and then merge this code in

As follow on work, we file tickets to:

  1. Improve the memory efficiency of sorting when no spilling is required (e.g. avoid the doubling required by combine_batches, use parallelized merge sort, etc)
  2. Improve the performance of SortKeyCursor (e.g. get rid of RWLock, etc)
  3. Add additional tests for when the data actually spills, etc

datafusion/src/physical_plan/explain.rs Show resolved Hide resolved
datafusion/tests/sql/joins.rs Show resolved Hide resolved
datafusion/src/physical_plan/sorts/mod.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/sorts/mod.rs Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
metrics,
inner_metrics,
used: AtomicUsize::new(0),
spilled_bytes: AtomicUsize::new(0),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be neat to add spilled bytes to the metrics reported by ExternalSorter so that they appeared in EXPLAIN ANALYZE for example. Perhaps as a follow on task

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, filed #1611 to track this.

datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
@yjshen
Copy link
Member Author

yjshen commented Jan 19, 2022

TPC-H sf=1 sort_extendedprice_discount. [combine_and_sort method]

Similar performance for the new sort compared with the previous sort.

./target/release/tpch benchmark datafusion --path ./data --format tbl --query 1 --batch-size 10240 --partitions 1

I changed q1 locally to run directly from tpch program to:

select
    l_returnflag,
    l_linestatus,
    l_quantity,
    l_extendedprice,
    l_discount,
    l_tax
from
    lineitem
order by
    l_extendedprice,
    l_discount;

query plan:

SortExec: [l_extendedprice@3 ASC NULLS LAST,l_discount@4 ASC NULLS LAST]
  ProjectionExec: expr=[l_returnflag@4 as l_returnflag, l_linestatus@5 as l_linestatus, l_quantity@0 as l_quantity, l_extendedprice@1 as l_extendedprice, l_discount@2 as l_discount, l_tax@3 as l_tax]
    CsvExec: files=[./data/lineitem.tbl], has_header=false, limit=None

W/ this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 1, batch_size: 10240, path: "./data", file_format: "tbl", mem_table: false }
Query 1 iteration 0 took 7239.6 ms
Query 1 iteration 1 took 7357.7 ms
Query 1 iteration 2 took 6668.1 ms
Query 1 avg time: 7088.46 ms

W/o this PR:

Running benchmarks with the following options: DataFusionBenchmarkOpt { query: 1, debug: false, iterations: 3, partitions: 1, batch_size: 10240, path: "./data", file_format: "tbl", mem_table: false }
Query 1 iteration 0 took 7135.3 ms
Query 1 iteration 1 took 7462.2 ms
Query 1 iteration 2 took 7484.8 ms
Query 1 avg time: 7360.79 ms

@yjshen yjshen changed the title Consolidate sort and external_sort, consolidate N-way merge sort Consolidate sort and external_sort Jan 19, 2022
@yjshen yjshen requested a review from alamb January 19, 2022 09:37
Copy link
Contributor

@tustvold tustvold left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is a really nice improvement, happy to see SortPreservingStream being used more 😄

That being said, I think there is a fair bit more async, locking and atomic shenanigans than I would expect to be necessary. Not sure this makes a huge performance impact, but it certainly makes the code harder to reason about.

I've left some comments around the place, other than a deadlock I think these can possibly be addressed in subsequent PRs if you would prefer, up to you 😄

datafusion/tests/sql/joins.rs Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
use futures::stream::Stream;
use futures::Future;
use pin_project_lite::pin_project;
use futures::lock::Mutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it should be possible to de-async the stream constructor to allow using parking_lot which is both lighter weight, and avoids the absolute brain melt that are async locks (they're also a monumental pain to debug)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think parking_lot is not used in DataFusion, I once use that but removed it later yjshen@6679628. Do you think we can add this dependency?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I thought it was, std::Mutex then. I more just meant a non-async Mutex 😁


let mut spills = self.spills.lock().await;
let used = self.used.swap(0, Ordering::SeqCst);
self.spilled_count.fetch_add(1, Ordering::SeqCst);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FWIW two separate atomic increments with SeqCst are likely slower than a single uncontended mutex

datafusion/src/physical_plan/sorts/sort.rs Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
datafusion/src/physical_plan/sorts/sort.rs Outdated Show resolved Hide resolved
}

/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`.
async fn sort(&self) -> Result<SendableRecordBatchStream> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand that as ExternalSorter implements MemoryConsumer directly we need to wrap it in an Arc, but just an observation that the interface and implementation of this component would be simpler if it took mutable references... Can insert_batch be called after sort, if so what happens? What about concurrently?

Maybe something to think about, the borrow checker can only help you if you don't go behind its back with Mutex, RefCell and similar 😆

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry I cannot quite follow here. 🤔 I think although async, rust would compile do_sort into a state machine, run as the sequence in it?

async fn do_sort(
    mut input: SendableRecordBatchStream,
    partition_id: usize,
    expr: Vec<PhysicalSortExpr>,
    metrics: AggregatedMetricsSet,
    runtime: Arc<RuntimeEnv>,
) -> Result<SendableRecordBatchStream> {
    let schema = input.schema();
    let sorter = Arc::new(ExternalSorter::new(
        partition_id,
        schema.clone(),
        expr,
        metrics,
        runtime.clone(),
    ));
    runtime.register_consumer(&(sorter.clone() as Arc<dyn MemoryConsumer>));

    while let Some(batch) = input.next().await {
        let batch = batch?;
        sorter.insert_batch(batch).await?;
    }

    let result = sorter.sort().await;
    runtime.drop_consumer(sorter.id());
    result
}

Is that possible sort executed before insert_batch() in the while let? How does concurrently affect this?

path: String,
schema: SchemaRef,
) -> Result<usize> {
let mut writer = IPCWriter::new(path.as_ref(), schema.as_ref())?;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More as an FYI for @alamb but poking around here IPCWriter uses dictionary IDs and will error if batches with different IDs are written. This will likely cause problems with the way arrow-rs, and IOx, currently handle dictionaries... I've created apache/arrow-rs#1206 to clarify what is going on here

@yjshen
Copy link
Member Author

yjshen commented Jan 19, 2022

Thanks @tustvold for the detailed review. I've first quick fix the async and documentation-related comments. For deadlock / mutex / hangup parts, let me think it over again.

@alamb
Copy link
Contributor

alamb commented Jan 19, 2022

@yjshen is this PR ready for another review, or are you planning more changes to it?

@yjshen
Copy link
Member Author

yjshen commented Jan 20, 2022

@alamb I find some of my messages for you are lost, maybe because of my unstable network.

Yes, I think the PR is ready for another round of review now.

  1. for SortKeyCursor:

AtomicUsize is introduced for sharing the SortKeyCursors in a BinaryHeap and a Vec of cursors. I can think of deduplicating its usage and only keeping it in heap, but let me do in a follow-up PR.. #1624
batch_comparators: RwLock it's required to make SortKeyCursor impl PartialOrd and Ord, since the original cmp in SortKeyCursor has the mut signature, but Ord needs it to be immutable. Do you have any suggestions on simplifying this?

  1. for AggregatedMetricsSet:

I don't really understand this code -- it appears to be merging in new (zero'd) metrics to sef.all_metrics`? Why not just return self.all_metrics ?

This means aggregating all metrics during a complex operation, composed of multiple steps, and each step reports its statistics separately. For the sort case here: when the dataset is more significant than available memory, it will report multiple in-mem sort metrics and final merge-sort metrics from SortPreservingMergeStream.
Therefore, We need a separation of metrics: final metrics (for output_rows accumulation), and intermediate metrics that we only account for elapsed_compute time. So I create a zero'd metrics and gather elapse_time and output_rows separately.

@alamb
Copy link
Contributor

alamb commented Jan 20, 2022

Thanks @yjshen -- I'll put this on my review queue for first thing tomorrow

@@ -304,6 +303,9 @@ pub(crate) struct SortPreservingMergeStream {
/// An index to uniquely identify the input stream batch
next_batch_index: usize,

/// min heap for record comparison
min_heap: BinaryHeap<Arc<SortKeyCursor>>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think while not ideal, we can clean this up in a follow on PR

@alamb
Copy link
Contributor

alamb commented Jan 21, 2022

FYI @houqp @Dandandan @liukun4515 @andygrove -- while @yjshen has done performance analysis and this change should not affect the performance of queries that stay all in memory, there is some possibility of regression. Let us know if you see some and we'll investigate.

@alamb alamb merged commit 7d819d1 into apache:master Jan 21, 2022
@alamb
Copy link
Contributor

alamb commented Jan 21, 2022

Epic work @yjshen -- thanks again for all the work and thank you @tustvold for the help reviewing

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
datafusion Changes in the datafusion crate
Projects
None yet
3 participants